
In this notebook we finally do our larger area. We're going to need some better visualisation tools and it would be great not to bring the results back to the Jupyter notebook but to leverage the dask clusters resources during visualisation. We'll be using some dask-aware visualiation libraries (holoviews and datashader) to do the heavy lifting.
Let's begin by starting up our cluster and sizing it appropriately to our computational task.
All the code here is the same as the conclusion from the previous notebook, except we'll make the cluster bigger with 10 workers instead of 4. We'll also make the masking and NDVI calculation into a python function since we won't be making any changes to that now.
We'll use the same ROI and time period for this run and we're using all the techniques so far to reduce the computation time:
# Initialize the Gateway client
from dask.distributed import Client
from dask_gateway import Gateway
number_of_workers = 10
gateway = Gateway()
clusters = gateway.list_clusters()
if not clusters:
print('Creating new cluster. Please wait for this to finish.')
cluster = gateway.new_cluster()
else:
print(f'An existing cluster was found. Connecting to: {clusters[0].name}')
cluster=gateway.connect(clusters[0].name)
cluster.scale(number_of_workers)
client = cluster.get_client()
client
An existing cluster was found. Connecting to: easihub.172abe6beae3457fba2c61f3261247d9
Client-0735f732-0446-11ee-8397-06a3e810ea18
| Connection method: Cluster object | Cluster type: dask_gateway.GatewayCluster |
| Dashboard: https://hub.csiro.easi-eo.solutions/services/dask-gateway/clusters/easihub.172abe6beae3457fba2c61f3261247d9/status |
import pyproj
pyproj.set_use_global_context(True)
import git
import sys, os
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta
from dask.distributed import Client, LocalCluster
import datacube
from datacube.utils import masking
from datacube.utils.aws import configure_s3_access
# EASI defaults
os.environ['USE_PYGEOS'] = '0'
repo = git.Repo('.', search_parent_directories=True).working_tree_dir
if repo not in sys.path: sys.path.append(repo)
from easi_tools import EasiDefaults, notebook_utils
easi = EasiDefaults()
Successfully found configuration for deployment "csiro"
dc = datacube.Datacube()
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client);
# Get the centroid of the coordinates of the default extents
central_lat = sum(easi.latitude)/2
central_lon = sum(easi.longitude)/2
# central_lat = -42.019
# central_lon = 146.615
# Set the buffer to load around the central coordinates
# This is a radial distance for the bbox to actual area so bbox 2x buffer in both dimensions
buffer = 0.8
# Compute the bounding box for the study area
study_area_lat = (central_lat - buffer, central_lat + buffer)
study_area_lon = (central_lon - buffer, central_lon + buffer)
# Data product
products = easi.product('landsat')
# Set the date range to load data over
set_time = easi.time
set_time = (set_time[0], parse(set_time[0]) + relativedelta(years=1))
# set_time = ("2021-01-01", "2021-12-31")
# Selected measurement names (used in this notebook)
nir = easi.nir('landsat') # If defined, else None
red = easi.red('landsat') # If defined, else None
if nir is None: nir = 'nir08' # USGS Landsat
if red is None: red = 'red' # USGS Landsat
# Set the QA band name and mask values
good_pixel_flags = { # USGS Landsat
'nodata': False,
'cloud': 'not_high_confidence',
'cloud_shadow': 'not_high_confidence',
'water': 'land_or_cloud'
}
qa_band = easi.qa_band('landsat') # If defined, else None
qa_mask = easi.qa_mask('landsat') # If defined, else None
if qa_band is None: qa_band = 'qa_pixel' # USGS Landsat
if qa_mask is None: qa_mask = good_pixel_flags
# Set the measurements/bands to load. `None` will load all of them
measurements = [qa_band, red, nir]
# Set the resampling method for the bands
resampling = {qa_band: "nearest", "*": "average"}
# Set the coordinate reference system and output resolution
set_crs = easi.crs('landsat') # If defined, else None
set_resolution = easi.resolution('landsat') # If defined, else None
# set_crs = "epsg:3577"
# set_resolution = (-30, 30)
# Set the scene group_by method
group_by = "solar_day"
def masked_seasonal_ndvi(dataset):
# Identify pixels that are either "valid", "water" or "snow"
cloud_free_mask = masking.make_mask(dataset[qa_band], **qa_mask)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free[nir] - cloud_free[red]
band_sum = cloud_free[nir] + cloud_free[red]
# Calculate NDVI
ndvi = None
ndvi = band_diff / band_sum
return ndvi.groupby("time.season").mean("time") # Calculate the seasonal mean
dataset = None # clear results from any previous runs
dataset = dc.load(
product=products,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = {"time":2, "x":3072, "y":3072},
group_by=group_by,
)
ndvi_unweighted = masked_seasonal_ndvi(dataset)
print(f"dataset size (GiB) {dataset.nbytes / 2**30:.2f}")
print(f"ndvi_unweighted size (GiB) {ndvi_unweighted.nbytes / 2**30:.2f}")
dataset size (GiB) 7.05 ndvi_unweighted size (GiB) 1.05
client.wait_for_workers(n_workers=10)
%%time
actual_result = ndvi_unweighted.compute()
CPU times: user 1.71 s, sys: 785 ms, total: 2.49 s Wall time: 44 s
You'll notice the computation time is slightly faster with more workers - we're IO bound so more workers means more available IO bandwidth and threads. It's not 2-3 x faster though - we're wasting a lot of resources because we can't actually use all of that extra power.
Tip: More isn't always better. Be mindful of your computational resource usage and cost. This size cluster is a tremendous waste for this size computational job. Size things appropriately.
And visualise the result
actual_result.sel(season='DJF').plot()
<matplotlib.collections.QuadMesh at 0x7f484c82bfd0>
We'll save the coordinates of this section from the array (as slices) so we can use them later for visualising the same ROI from a larger dataset.
x_slice = slice(ndvi_unweighted.x[0], ndvi_unweighted.x[-1])
y_slice = slice(ndvi_unweighted.y[0], ndvi_unweighted.y[-1])
Let's change the area extent to about 4 degrees square.
# Compute the bounding box for the study area
buffer = 2
# Compute the bounding box for the study area
study_area_lat = (central_lat - buffer, central_lat + buffer)
study_area_lon = (central_lon - buffer, central_lon + buffer)
dataset = None # clear results from any previous runs
dataset = dc.load(
product=products,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = {"time":2, "x":3072, "y":3072},
group_by=group_by,
)
ndvi_unweighted = masked_seasonal_ndvi(dataset)
Before we compute anything let's take a look at our result's shape and size
print(f"dataset size (GiB) {dataset.nbytes / 2**30:.2f}")
print(f"ndvi_unweighted size (GiB) {ndvi_unweighted.nbytes / 2**30:.2f}")
dataset size (GiB) 90.18 ndvi_unweighted size (GiB) 6.56
An order of magnitude larger for the results.
The result is getting on the large size for the notebook node so we will need to pay attention to data locality and the size of results being processed. The cluster has a LOT more memory than the notebook node; bring too much back to the notebook and the notebook will crash.
Tip: Be mindful of the size of the results and their data locality.
Now let's check the shape, tasks and chunks
dataset
<xarray.Dataset>
Dimensions: (time: 88, y: 16097, x: 13672)
Coordinates:
* time (time) datetime64[ns] 2020-02-01T23:50:22.661832 ... 2021-01...
* y (y) float64 -3.777e+06 -3.778e+06 ... -4.26e+06 -4.26e+06
* x (x) float64 1.151e+06 1.152e+06 ... 1.562e+06 1.562e+06
spatial_ref int32 3577
Data variables:
oa_fmask (time, y, x) uint8 dask.array<chunksize=(2, 3072, 3072), meta=np.ndarray>
nbart_red (time, y, x) int16 dask.array<chunksize=(2, 3072, 3072), meta=np.ndarray>
nbart_nir (time, y, x) int16 dask.array<chunksize=(2, 3072, 3072), meta=np.ndarray>
Attributes:
crs: EPSG:3577
grid_mapping: spatial_refLooking at the red data variable we can see about 50 GiB for the array, 36 MiB per chunk and 5526 tasks. Noting the nir and qa_band will be similarly shaped and size.
The number of tasks is climbing so we can expect an increase in task graph optimisation time.
Chunk size and tasks seems okay, but we will monitor the dask dashboard in case there are issues with temporaries causing workers to spill to disk if memory is too full.
The chunking is resulting in some slivers, particularly on the y axis. Let's modify the y chunk size so these slivers don't exist as its blowing out the tasks and is likely unnecessary. Five chunks vertically is a close fit so let's expand the chunk size and see what happens. We will need to check the chunk size afterwards to make sure it doesn't get too large. If it does we can make the chunks smaller to reduce slivers too.
from math import ceil
y_chunks = ceil(dataset.dims['y']/5)
y_chunks
3220
dataset = None # clear results from any previous runs
dataset = dc.load(
product=products,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = {"time":2, "x":3072, "y":y_chunks},
group_by=group_by,
)
ndvi_unweighted = masked_seasonal_ndvi(dataset)
Now recheck our chunk size and tasks for red
dataset
<xarray.Dataset>
Dimensions: (time: 88, y: 16097, x: 13672)
Coordinates:
* time (time) datetime64[ns] 2020-02-01T23:50:22.661832 ... 2021-01...
* y (y) float64 -3.777e+06 -3.778e+06 ... -4.26e+06 -4.26e+06
* x (x) float64 1.151e+06 1.152e+06 ... 1.562e+06 1.562e+06
spatial_ref int32 3577
Data variables:
oa_fmask (time, y, x) uint8 dask.array<chunksize=(2, 3220, 3072), meta=np.ndarray>
nbart_red (time, y, x) int16 dask.array<chunksize=(2, 3220, 3072), meta=np.ndarray>
nbart_nir (time, y, x) int16 dask.array<chunksize=(2, 3220, 3072), meta=np.ndarray>
Attributes:
crs: EPSG:3577
grid_mapping: spatial_refVery marginal increase in memory per chunk but the tasks have dropped from 5526 to 4661. Note that this occurs for every measurement and operation so the benefit is significant.
ndvi_unweighted
<xarray.DataArray (season: 4, y: 16097, x: 13672)>
dask.array<stack, shape=(4, 16097, 13672), dtype=float64, chunksize=(1, 3220, 3072), chunktype=numpy.ndarray>
Coordinates:
* y (y) float64 -3.777e+06 -3.778e+06 ... -4.26e+06 -4.26e+06
* x (x) float64 1.151e+06 1.152e+06 ... 1.562e+06 1.562e+06
spatial_ref int32 3577
* season (season) object 'DJF' 'JJA' 'MAM' 'SON'Total task count is sub 100_000 so should be okay but task graph optimisation will take a while. Resulting array is a bit big for the notebook node as stated previously.
The shape spatially is y:15686, x:13707. Standard plots aren't going to work very well for visualising the result in the notebook and the result uses a fair amount of memory so we'll need a different approach.
For now let's visualize the same ROI as the small area before. We stashed that ROI in x_slice, y_slice.
If you haven't already, open the dask dashboard so you can watch the cluster make progress
The code to do this visualisation is basically the same as before except now we specify a slice
%%time
ndvi_unweighted.sel(season='DJF', x=x_slice, y=y_slice).compute().plot()
CPU times: user 3.93 s, sys: 532 ms, total: 4.47 s Wall time: 34.7 s
<matplotlib.collections.QuadMesh at 0x7f48445c29e0>
The computation time is relatively short since we are only materialising the result for a subset of the overall dataset.
To visualise all of the data we will make use of the dask cluster and some dask-aware visualisation capabilties from holoviews and datashader python libraries. These libraries provide an interactive visualisation capability that leaves the large datasets on the cluster and transmits only the final visualisation to the Jupyter notebook. This is done on the fly so the user can zoom and pan about the dataset in all dimensions and the dask cluster will scale data to fit in the viewport automatically. Details of how this is done and advanced features available is beyond the scope of this dask and ODC course but the manuals are extensive and the basic example here both powerful and useful.
Tip: The datashader pipeline page provides an excellent summary of what's going on.
compute() and persist()¶The first thing we will do is persist() the results of our calculation to the cluster. This will materialise the results but will keep the result on the cluster (so all lazy tasks are calculated, just like compute() but data locality remains on the cluster). This will ensure the result is readily available for the visualisation. The cluster has plenty of (distributed) memory so there is no reason not to materialise the result on the cluster.
persist() is non-blocking so will return just as soon as task graph optimisation (which is performed in the notebook kernel) is complete. Run the next cell and you will see it takes a few seconds to do task graph optimisation, and once that is complete the Jupyter notebook will be available for use again. At the same time the dask dashboard will show tasks running as the result is computed and left on the cluster.
%%time
on_cluster_result = ndvi_unweighted.persist()
CPU times: user 1.19 s, sys: 23.8 ms, total: 1.22 s Wall time: 1.2 s
The on_cluster_result will continue to show as a dask array on the cluster - not actual results. Think of it as a handle that links the Jupyter client to the result on the dask cluster.
on_cluster_result
<xarray.DataArray (season: 4, y: 16097, x: 13672)>
dask.array<stack, shape=(4, 16097, 13672), dtype=float64, chunksize=(1, 3220, 3072), chunktype=numpy.ndarray>
Coordinates:
* y (y) float64 -3.777e+06 -3.778e+06 ... -4.26e+06 -4.26e+06
* x (x) float64 1.151e+06 1.152e+06 ... 1.562e+06 1.562e+06
spatial_ref int32 3577
* season (season) object 'DJF' 'JJA' 'MAM' 'SON'The cluster will go do the computation, we can continue here though. Let's import holoviews and datashader. We'll be using datashader.rasterize to handle the scaling of the full dataset which is 19323, 14172 in size into the 800 or so pixel dimension being displayed in the notebook.
Notice also there are no bounds set on the dataset, we are viewing the entire result, including the season dimension. holoviews will provide an interface for pan, zoom and time (season) selection and you can use the mouse to move around the data.
import holoviews as hv
import xarray as xr
from holoviews import opts
from holoviews.operation.datashader import rasterize
hv.extension("bokeh", width=800)
holoviews expects the xarray.DataArray to have a name so let's give it one via the name attribute
on_cluster_result.name = "ndvi"
on_cluster_result
<xarray.DataArray 'ndvi' (season: 4, y: 16097, x: 13672)>
dask.array<stack, shape=(4, 16097, 13672), dtype=float64, chunksize=(1, 3220, 3072), chunktype=numpy.ndarray>
Coordinates:
* y (y) float64 -3.777e+06 -3.778e+06 ... -4.26e+06 -4.26e+06
* x (x) float64 1.151e+06 1.152e+06 ... 1.562e+06 1.562e+06
spatial_ref int32 3577
* season (season) object 'DJF' 'JJA' 'MAM' 'SON'aspect = on_cluster_result.sizes['y']/on_cluster_result.sizes['x']
width = 800
height = int(width*aspect)
ndvi_seasonal_mean_ds = on_cluster_result.to_dataset(
name="ndvi_seasonal_mean"
) # holoviews works better with datasets so let's convert the xarray DataArray holding ndvi into a Dataset
The next cell will display the result - when its ready. The rasterize function will calculate a representative pixel for display from the full array on the dask cluster. If you monitor the dashboard you will see small bursts of activity across the workers and quite some waiting whilst data transfers occur to bring all the summary information back and transmit it to the Jupyter notebook.
You can use the controls on the right to pan and zoom around the full image. If you zoom in, rasterize will take a moment to do a new summary for the current zoom level and show more or less detail. Similarly for panning.
hv_ds = hv.Dataset(ndvi_seasonal_mean_ds)
rasterize(
hv_ds.to(hv.Image, ["x", "y"], "ndvi_seasonal_mean", ["season"]).opts(
title="NDVI Seasonal Mean",
cmap="RdYlGn", # NDVI more green the larger the value.
clim=(-0.5, 1.0), # we'll clamp the range for visualisation to enhance the visualisation
colorbar=True,
width=width,
height=height
)
, precompute=True)
Disconnecting your client is good practice, but the cluster will still be up so we need to shut it down as well
client.close()
cluster.shutdown()